[SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap#16844
[SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap#16844davies wants to merge 4 commits intoapache:masterfrom
Conversation
|
cc @JoshRosen, @viirya |
|
Test build #72541 has finished for PR 16844 at commit
|
|
LGTM |
| isDefined = true; | ||
|
|
||
| if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) { | ||
| if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) { |
There was a problem hiding this comment.
The re-allocated space might not be used if no further insertion. Shall we do growAndRehash at the beginning of append when numKeys == growthThreshold && !isDefined?
There was a problem hiding this comment.
Unfortunately, we can't grow in the beginning, otherwise the pos will be wrong.
if it fail to grow once (stay as intial capacity).
|
@viirya Addressed your comment, also fixed another bug (updated PR description). |
|
Test build #72594 has finished for PR 16844 at commit
|
|
Test build #3566 has finished for PR 16844 at commit
|
| try { | ||
| growAndRehash(); | ||
| } catch (OutOfMemoryError oom) { | ||
| return false; |
There was a problem hiding this comment.
Unrelated, but this OutOfMemoryError will not be useful - atleast not in yarn mode.
It will simply cause the jvm to exit.
| || !canGrowArray && numKeys > growthThreshold) { | ||
| return false; | ||
| if (numKeys >= growthThreshold) { | ||
| if (longArray.size() / 2 == MAX_CAPACITY) { |
There was a problem hiding this comment.
This does not look correct as per documentation of MAX_CAPACITY.
Actual number of keys == MAX_CAPACITY (so that total number of entries in longArray is MAX_CAPACITY * 2)
There was a problem hiding this comment.
We grow the array when numKeys >= growthThreshold and growthThreshold = capacity * 0.5. But we actually allocate capacity * 2 spaces for the array.
So actually numKeys < growthThreshold = capacity * 0.5 < array length = capacity * 2 should hold true.
Because numKeys < growthThreshold is always true, if numKeys == MAX_CAPACITY, the capacity would be MAX_CAPACITY * 2 at least and the length of array will be more than MAX_CAPACITY * 4.
But in allocate, there is an assert of capacity <= MAX_CAPACITY. Looks like those condition are inconsistent.
There was a problem hiding this comment.
Also, we need to move the appropriate validation check into growAndRehash() and not here.
There was a problem hiding this comment.
There are two reason it will fail to grow: 1) current capacity (longArray.size() / 2) reach MAX_CAPACITY 2) can't allocate a array (OOM).
So, I think the checking here is correct.
There was a problem hiding this comment.
@davies You are right that the check for longArray.size() / 2 == MAX_CAPACITY is the upper bound beyond which we cant grow. It is simply confusing it do it outside growAndRehash - which is what threw me off.
Please move the check into growAndRehash() and have it return true in case it could successfully grow the map.
There was a problem hiding this comment.
@mridulm Do that means we should also rename the growAndRehash to tryGrowAndRehash? I think those are not necessary.
There was a problem hiding this comment.
The invariant in question is for growAndRehash() - not append, and as such should live there.
Code evolution causing grow to be invoked from elsewhere will require duplication of the invariant everywhere.
Btw, this is in line with all other data structures spark (and other frameworks) have.
| longArray.set(pos * 2 + 1, keyHashcode); | ||
| isDefined = true; | ||
|
|
||
| if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) { |
There was a problem hiding this comment.
This longArray.size() < MAX_CAPACITY should be wrong condition.
There was a problem hiding this comment.
longArray.size() is the next capacity for current grow strategy, it should be longArray.size() <= MAX_CAPACITY
| || !canGrowArray && numKeys > growthThreshold) { | ||
| return false; | ||
| if (numKeys >= growthThreshold) { | ||
| if (longArray.size() / 2 == MAX_CAPACITY) { |
There was a problem hiding this comment.
Is MAX_CAPACITY still the maximum number of keys as per documentation of it? If we can have longArray.size() / 2 == MAX_CAPACITY at most for the capacity, the actually numKeys should be MAX_CAPACITY / 2, because we need two long array entries per key, right?
There was a problem hiding this comment.
@davies is correct; but it is a slightly unintuitive way to write the condition.
val currentSize = longArray.size()
val newSize = currentSize * 2
val currentKeysLen = currentSize / 2
val newKeysLen = currentKeysLen * 2
if (newKeysLen > MAX_CAPACITY) then fail.
that is if (currentKeysLen == MAX_CAPACITY) then fail // Since we allow only power of 2's for all these values.
that is if (longArray.size() / 2 == MAX_CAPACITY)
Particularly given its location (in append as opposed to grow), it serves to be a bit more confusing that expected.
There was a problem hiding this comment.
If the currentKeysLen above is the number of keys, it never equals to currentSize / 2. currentSize / 2 is actually the capacity we want to allocate (but actually we allocate double of it for the array).
Once the number of keys reaches growthThreshold (i.e., capacity * 0.5), we go to grow the array or fail the append. So the number of keys is always less than or equal to capacity * 0.5 which is currentSize * 0.5 * 0.5.
There was a problem hiding this comment.
To clarify, the length's @davies and I mentioned are not actual number of keys in the map, but maximum number of keys possible in the map.
|
Test build #3571 has finished for PR 16844 at commit
|
| // then we don't try to grow again if hit the `growthThreshold`. | ||
| || !canGrowArray && numKeys > growthThreshold) { | ||
| return false; | ||
| if (numKeys >= growthThreshold) { |
There was a problem hiding this comment.
I think we need to grow the array only if isDefined == false.
…ever grow" This reverts commit d9aa208.
|
Test build #72946 has finished for PR 16844 at commit
|
|
retest this please. |
|
Test build #72977 has finished for PR 16844 at commit
|
| // The map could be reused from last spill (because of no enough memory to grow), | ||
| // then we don't try to grow again if hit the `growthThreshold`. | ||
| || !canGrowArray && numKeys > growthThreshold) { | ||
| || !canGrowArray && numKeys >= growthThreshold) { |
There was a problem hiding this comment.
This change makes sense to me because growthThreshold's Scaladoc says "The map will be expanded once the number of keys exceeds this threshold" and here we're considering the impact of adding an additional key (so this could have also been written as (numKeys + 1) > growthThreshold).
## What changes were proposed in this pull request? Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1). This PR fix the off-by-one bug in BytesToBytesMap. This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 . ## How was this patch tested? Added regression test. Author: Davies Liu <davies@databricks.com> Closes #16844 from davies/off_by_one. (cherry picked from commit 3d0c3af) Signed-off-by: Davies Liu <davies.liu@gmail.com>
|
Merging into master, 2.1, 2.0 branch. |
## What changes were proposed in this pull request? Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1). This PR fix the off-by-one bug in BytesToBytesMap. This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 . ## How was this patch tested? Added regression test. Author: Davies Liu <davies@databricks.com> Closes #16844 from davies/off_by_one. (cherry picked from commit 3d0c3af) Signed-off-by: Davies Liu <davies.liu@gmail.com>
What changes were proposed in this pull request?
Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1).
This PR fix the off-by-one bug in BytesToBytesMap.
This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 .
How was this patch tested?
Added regression test.